\section{基于Spark的图划分}

Apache Spark是一个开源集群运算框架，最初是由加州大学柏克莱分校AMPLab所开发。
相对于Hadoop的MapReduce会在运行完工作后将中介数据存放到磁盘中，Spark使用了存储器内运算技术，能在数据尚未写入硬盘时即在存储器内分析运算。
Spark在存储器内运行程序的运算速度能做到比Hadoop MapReduce的运算速度快上100倍，即便是运行程序于硬盘时，Spark也能快上10倍速度。

Apache Spark项目包含下列几项：弹性分布式数据集（RDDs）、Spark SQL、Spark Streaming、MLlib和GraphX。
Spark提供了分布式任务调度，调度和基本的I/O功能。
Spark的基础程序抽象是弹性分布式数据集（RDDs），RDD一个可以并行操作、有容错机制的数据集合。
RDDs可以透过引用外部存储系统的数据集创建（例如：共享文件系统、HDFS、HBase或其他 Hadoop 数据格式的数据源），
或者是透过在现有RDDs的转换而创建（比如：map、filter、reduce、join等等）。 

\subsection{图数据实现}

为了在Spark上进行图划分，本文将先介绍\texttt{Node}和\texttt{Graph}这两个类的基本情况。
\texttt{Graph}类主要用\texttt{edgeRDD}和\texttt{nodeRDD}表示以及进行分布式存储和计算。

\begin{itemize}
    \item \texttt{edgeRDD}的每一条记录用\texttt{sourceNode,targetNode,weight,isMatch}表示。
    这四个部分组成的意义分别是：起点节点Id值、终点节点Id值、边权重、是否已经被匹配。
    
    \item \texttt{nodeRDD}的每一条记录是一个\texttt{Node}类，\texttt{Node}类的属性主要包括：
    idxneighbour、E、I、partition、chosen(是否被KL算法交换过)、由哪些点组成（用于Metis算法中）、isMark(是否是匹配点)、weight（用于Metis组合点）。

\end{itemize}

\texttt{Graph}类和\texttt{Node}类包含了大量的基础函数，包括
\begin{itemize}
    \item 根据输入的\texttt{edge}，构建\texttt{nodeRDD}
    \item 根据分区结果计算\texttt{Node}的属性，包括$E$和$I$
    \item 交换了两个点之后，应该如何调整$E$和$I$
    \item 图划分指标的计算
    \item $\cdots$
\end{itemize}

\subsubsection{\texttt{Node}类}

\texttt{Node}类是图数据处理的基础。

\begin{table}[htbp]
    \centering
    \caption{\texttt{Node}类主要属性}
    \begin{tabular}{ccc}
        \hline
        属性& 类型 & 定义\\
        \hline
        idx         & String             & 节点的唯一Id \\
        neighbour   & Map[String,Double] & 节点的所有近邻节点\\
        E           & Double             & 外部权重，即节点与其他子图内的节点的连接权重和\\
        I           & Double             & 内部权重，即节点与本子图内的节点的连接权重和\\
        partition   & Int                & 节点所在子图的Id\\
        chosen      & Boolean            & 节点是否与其他子图的节点交换过\\
        composition & List[Node]         & 该节点的组成节点列表（仅用于节点聚合/拆分过程）\\
        composLevel & Int                & 该节点的聚合程度（仅用于节点聚合/拆分过程）\\
        isMark      & Boolean            & 节点是否与其他节点匹配过 \\
        weight      & Double             & 节点的权重\\
        \hline
        \centering
    \end{tabular}
\end{table}

\subsubsection{\texttt{Graph}类}

\texttt{Graph}类是将节点和边组合起来。

\begin{table}[htbp]
    \centering
    \caption{\texttt{Graph}类主要属性}
    \begin{tabular}{ccc}
        \hline
        属性& 类型 & 定义\\
        \hline
        nodeNum & Long & 图内节点的个数 \\
        edgeRDD & RDD[(Str, Str, Double, Bool)] & 图内所有边数据的RDD形式\\
        nodeRDD & RDD[Node] & 图内所有节点的RDD形式\\
        \hline
        \centering
    \end{tabular}
\end{table}


\subsection{哈希划分算法实现}

通过map算子将\texttt{graph}里面的\texttt{nodeRDD}转化为(idx,分区号)键值类型的RDD。

根据输入的划分数$k$，对\texttt{Node}的Id重新进行哈希划分，具体公式是$hash(idx)\%k$。

通过在Spark executor中的taskcontext得到每一条记录的分区号，得到(idx,分区号)的键值对RDD形式。
然后调用\texttt{graph.buildPartitionGraph}方法，根据得到的分区信息更新\texttt{graph}的\texttt{nodeRDD}信息。

\begin{lstlisting}[language=Scala]
import org.apache.spark.{HashPartitioner, TaskContext}
import util.Graph

object HashGraphPartition {
    def partition(graph: Graph, partitions: Int): Graph = {
        val assigenment = graph.nodeRDD.map(x => (x.getIdx, 0)).partitionBy(
            new HashPartitioner(partitions)).map(x => (x._1, TaskContext.getPartitionId))
        graph.buildPartitionGraph(assigenment)
    }
}
\end{lstlisting}

\subsection{谱聚类算法实现}

谱聚类的核心思想是最小化子图之间的连边权重。
具体做法是对拉普拉斯矩阵求最小的$k$个特征值对应特征向量。
为了控制子图的规模，需要用度矩阵（相似度矩阵的行和）进行归一化。
这里有两个挑战：
\begin{enumerate}
    \item 对拉普拉斯矩阵进行特征值分解的复杂度非常高（$O(N^3)$），而且很难实现并行化。
    \item 在Metis算法中，如果每个\texttt{node}的权重不一样，很可能产生不平衡分割。
\end{enumerate}

第一个挑战的解决方案可以参考Frank Lin和William W.Cohen发表于ICML 2010的论文，他们提出的幂迭代算法将特征值分解转化为矩阵的迭代乘积。

因为矩阵乘积的并行化在Spark底层已经实现并且优化，所以并行化更加简单。
具体来说，就是在数据归一化的逐对相似矩阵上，使用截断的幂迭代，寻找数据集的一个超低维嵌入。
这种嵌入恰好是很有效的聚类指标，使它在真实数据集上总是好于广泛使用的谱聚类方法。
算法的伪代码如下：

\begin{algorithm}[htbp]
\caption{PIC算法流程}
\SetAlgoLined
\KwIn{按行归一化的关联矩阵$W$}
\KwIn{期望聚类数$k$}
随机选取一个非零初始向量$v^0$ \\
\Repeat{$\left|\delta^{t}-\delta^{t-1}\right| \simeq 0$}{
    $ v^{(t+1)}=\frac{W v^{(t)}}{\left|W v^{(t)}\right|_{1}} $\\
    $\delta^{(t+1)}=\left|v^{(t+1)}-v^{(t)}\right|$
    增加$t$值
}
使用k-means算法对向量$v^t$中的点进行聚类
\KwOut{类$C_1,C_2,\cdots,C_K$}
\end{algorithm}

\subsection{Kernighan-Lin算法实现}

Kernighan-Lin算法的思想迭代贪心优化。
对于给定的图划分，Kernighan-Lin算法尝试交换不同分区的点，计算交换带来的增益，选择增益大于$0$并且最大的点来交换，所有点对交换的增益都小于等于$0$。
对于Spark分布式计算而言，KL算法首先会根据输入的初始化图划分计算，构建划分图。

本文提供了好几种构建划分图的模式，这里介绍两种——
\begin{enumerate}
    \item 基于\texttt{nodeRDD}的方法，也就是针对\texttt{nodeRDD}已经存在的情况构建
    \item 基于\texttt{edgeRDD}的方法，也就是\texttt{nodeRDD}还不存在的情况
\end{enumerate}

基于\texttt{nodeRDD}的核心是根据邻居是否与自己在一个分区来计算这个节点跟内部和外部的连接权重。具体的Scala代码如下：

\begin{lstlisting}[language=Scala]
def buildPartitionGraph():Graph={
    if(this.nodeRDD==null) buildGraph()
    val map_idx_partition = this.nodeRDD.map(x=>(x.getIdx,x.getPartition)).collectAsMap()
    this.nodeRDD=this.nodeRDD.map(
        x=>{
            val neighbour = x.getNeighbour
            var E = 0.0
            var I = 0.0
            for (elem <- neighbour) {
                if(map_idx_partition.contains(elem._1)){
                    if(map_idx_partition(elem._1)==x.getPartition)
                        I+=elem._2
                    else
                        E+=elem._2
                }
            }
            x.setE(E).setI(I)
        }
    )
    this
}
\end{lstlisting}
基于\texttt{nodeRDD}的划分方法的瓶颈在于在数据量很大的时候，map表查询的方法存在性能瓶颈。
更好的方法是通过join来实现节点邻居和分区信息的聚合，具体Scala代码如下：
\begin{lstlisting}[language=Scala]
val neighbour_idx_weight:RDD[(String,(String,Double,Int))] = this.nodeRDD.map(
            x=>(
                    x.getIdx,
                    x.getNeighbour.map(y=>(x.getIdx,y._1,y._2,x.getPartition)),
            )).flatMap(_._2).map(x=>(x._2,(x._1,x._3,x._4)))
        val idx_partition:RDD[(String,Int)] = this.nodeRDD.map(x=>(x.getIdx,x.getPartition))
        val idx_neighbour_partition =
            neighbour_idx_weight.join(idx_partition).
                    map(x=>((x._2._1._1,x._2._1._3),(x._1,x._2._1._2,x._2._2)))
        val idxNeighbourPartition =
            idx_neighbour_partition.groupByKey().map(x=>(x._1._1,x._2.toList,x._1._2))
\end{lstlisting}
随后KL算法会更新\texttt{nodeRDD}里面的各个属性，特别是$E$和$I$，$E$和$I$在KL算法执行过程中会迭代更新。

在迭代的每一步，KL算法首先是确定最大增益的点。
它的优点是足够准确，需要的迭代次数较少，缺点是每次需要计算大量点对的增益。
在Spark分布式实现上，具体做法是求出\texttt{nodeRDD}的笛卡尔积，去除一些没必要计算的点之后求出点对之间的增益。
只保留大于$0$的增益，然后通过reduce算子求出增大增益的点对。

\begin{lstlisting}[language=Scala]
def getMaxGain(nodeUnChosen: RDD[Node]): (Node, Node, Double) = {
    val node_gain = nodeUnChosen.cartesian(nodeUnChosen).filter(
        x => x._1.getPartition != x._2.getPartition
    ).map(x => {
        (x._1, x._2, x._1.swapGain(x._2))
    }).persist()

    val pos_gain = node_gain.filter(_._3 > 0)
    if (pos_gain.isEmpty()) null
    else pos_gain.reduce((x, y) => {
        if (x._3 >= y._3) x else y
    })
}
\end{lstlisting}

针对最大增益需要计算大量点对的情况，本项目提出了两种优化思路：

\begin{enumerate}
    \item Stochastic Max Gain
    每次只找到增益大于$0$的点就交换，类似于机器学习里面的随机梯度下降，整个优化过程非常曲折。
    这种策略每次交换的增益很低，比较难收敛。

    \item Mini-Partition Max Gain
    可以把\texttt{nodeRDD}或者\texttt{nodeRDD}的笛卡尔积分成很多个分区，每次随机取一个分区计算最大增益的点对进行交换。
    这个策略借用了机器学习里面对Mini Batch的数据进行Gradient Descent的精神，不过暂时没有实现。
\end{enumerate}

选择完交换的点之后，需要更新相关的点的$E$和$I$。
具体做法是调用\texttt{graph}类的\texttt{swapUpdate}方法，内部通过用map算子。
对\texttt{nodeRDD}每一个节点记录分布式调用\texttt{Node}的\texttt{swapUpdate}方法，内部通过用map算子。
\texttt{Node}的\texttt{swapUpdate}方法会根据点与交换两个点的连接关系更新$E$和$I$，每一次交换，程序都会记录性能的变化，便于观察。
% 伪代码如下

% \begin{lstlisting}[language=Scala]
% graph.swapUpdate的细节是调用点的swapUpdate方法：
% this.nodeRDD = this.nodeRDD.map(
%             x => x.swapUpdate(swap_node_a, swap_node_b)
%         )
% \end{lstlisting}

% input:node d
% out put:update node d
% if 节点d是节点a本身
%     d.E = a.I+edgeWeight(a,b)
%     d.I = a.E-edgeWeight(a,b)
% else if 节点d是节点b本身
%     d.E = b.I+edgeWeight(a,b)
%     d.I = b.E-edgeWeight(a,b)
% else if 节点d跟节点a在一个分区
%     d.E = d.I-edgeWeight(d,a)+edgeWeight(d,b)
%     d.I = d.E+edgeWeight(d,a)-edgeWeight(d,b)
% else if 节点d跟节点b在一个分区
%     d.E = d.I+edgeWeight(d,a)-edgeWeight(d,b)
%     d.I = d.E-edgeWeight(d,a)+edgeWeight(d,b)

对应的scala代码为：

\begin{lstlisting}[language=Scala]
if (is_node_a)
    return this.setE(I_a + weight_ab).setI(E_a - weight_ab).
            setChosen(true).setPartition(swap_node_b.getPartition)
if (is_node_b)
    return this.setE(I_b + weight_ab).setI(E_b - weight_ab).
            setChosen(true).setPartition(swap_node_a.getPartition)

val weight_a = this.edgeWeight(swap_node_a)
val weight_b = this.edgeWeight(swap_node_b)

if (weight_a == 0.0 && weight_b == 0.0) return this

if (in_a_graph)
    this.setI(
        this.getI - weight_a + weight_b
    ).setE(
        //这些点的E增加了与a连接的权重
        this.getE + weight_a - weight_b
    )
else
    this.setI(
        this.getI + weight_a - weight_b
    ).setE(
        //这些点的E增加了与a连接的权重
        this.getE - weight_a + weight_b
    )
\end{lstlisting}

\subsection{Metis算法实现}

Metis算法主要是通过最大匹配，合并匹配点，来实现图的粗化。
得到粗化的图之后调用一些图划分算法（比如谱聚类），将图划分为k个子图。
最后对图进行细化，也就是将原来粗化的图一步步还原回来。
由于在粗化的图上面进行图划分会有信息丢失，还原之后得到图划分不一定是最优的图划分。
因此Metis算法需要用Kernighan-Lin算法实现微调。

\subsubsection{粗化}

Metis的粗化过程会一直执行，直到图中的点小于$c*k$个，其中$k$是程序输入，表示划分成几个图，$c$是超参数，一般取$10~15$。

\begin{lstlisting}
while(graph.nodeNum<c*k)
   找出图中所有的最大匹配边
   合并最大匹配的边对应的两个点
   更新节点和边的状态
\end{lstlisting}

其中最大匹配的伪代码如下：

\begin{lstlisting}
while(还存在可以匹配的边)
    找出下一条匹配的边
    合并两个点，更新nodeRDD以及edgeRDD
\end{lstlisting}

其中的核心是找最大匹配边，这里采用了两种策略，一种是Heavy Edge，一种是Random Heavy Edge。
\begin{itemize}
    \item Heavy Edge算法每次通过reduce算子找到两个端点都没有匹配过的边里面最大权重的边。这里使用了filter和reduce算子
    \item Random Heavy Edge算法每次随机找一个点，找出与这个点连接的所有边的最大权重。这里使用了takeSample、filter与reduce算子
\end{itemize}

Heavy Edge的缺点是每次只找最大的匹配边，粗化过程需要多次调用这样的最大匹配算法，容易形成匹配边聚集，影响划分效果。
相比之下，Random Heavy Edge通过随机采样，避免匹配点聚集，让划分更加均衡，但是依赖随机采样的数据点。

\subsubsection{粗化过程中的数据结构更新}

实时更新\texttt{nodeRDD}以及\texttt{edgeRDD}的核心思路是通过filter和map算子的重复计算。

首先求出A和B的共同近邻节点，对于节点A和节点B都连接到的节点C，设置$edgeWeight=edgeWeight(AC)+edgeWeight(BC)$，此处使用了map算子。

同时新建一个\texttt{Node}类，设置其近邻为A与B的并集，组成节点列表composition由nodeA和nodeB组成，新节点的权重为原来的节点A和节点B的权重之和。
表示是否匹配的变量\texttt{isMark}设置为\texttt{true}。

在进行节点合并时，首先删去节点B，然后把节点A替换为新生成的节点。
这里通过map算子处理\texttt{nodeRDD}，对于节点A，直接重新赋值为new Node。
根据new Node得到一个\texttt{edgeMap}，是(起点，终点)到权重的映射，表示需要更新的边，用于更新\texttt{edgeRDD}。

接下来需要处理与节点A和节点B相连的其他节点。
\begin{itemize}
    \item 对于与A和B都连接的点D
        连接到new Node的\texttt{edgeWeight}更新为\texttt{edgeWeight(DA)+edgeWeight(DB)}
    \item 对于与只与A连接的点D
        连接到new Node的\texttt{edgeWeight}更新为$edgeWeight(DA)$
    \item 对于与只与B连接的点D
        连接到new Node的\texttt{edgeWeight}更新为$edgeWeight(DB)$
\end{itemize}

然后更新\texttt{edgeRDD}，用map算子更新每一条记录，更新权重。
每条边只要有一个点被匹配，\texttt{isMark}参数就被设置为\texttt{true}。

合并两个节点的邻居的Scala代码如下：

\begin{lstlisting}[language=Scala]
unionMap = nodeA.neighbour++nodeB.neighbour
intersetNeighbour = nodeA.neighbour.keySet & nodeB.neighbour.keySet
unionMap.map(
   x=>if(x in intersetNeighbour)
    (x._1,nodeA.edgeWeight(x._1) + nodeB.edgeWeight(x._1))
)
\end{lstlisting}

node RDD在更新过程中会维护一个需要更新的边表（需要更新的边较少，内存可以存下来），更新的Scala代码如下：

\begin{lstlisting}[language=Scala]
    
val newNode = mergeNode(node1,node2,level)
var neighbourEdgeMap: Map[(String,String),Double] =
    newNode.getNeighbour.map(x=>((newNode.getIdx,x._1),x._2))

graph.nodeRDD = graph.nodeRDD.filter(
    _.getIdx!=node2.getIdx
).map(x=>
    if(x.getIdx==node1.getIdx) newNode
    else{
        val weight = x.edgeWeight(node1)+x.edgeWeight(node2)

        if(weight!=0) {
            neighbourEdgeMap += (x.getIdx,newNode.getIdx)->weight
            x.popNeighbour(node1).popNeighbour(node2).
                    pushNeighbour((newNode.getIdx,weight))
        }
        else x
    }
)
\end{lstlisting}

\texttt{edgeRDD}可以根据之前需要更新的边表来更新，Scala代码如下

\begin{lstlisting}[language=Scala]
graph.edgeRDD = graph.edgeRDD.filter
    { x =>
        !((x._1 == node1.getIdx && x._2 == node2.getIdx) ||
                (x._1 == node2.getIdx) && (x._2 == node1.getIdx))
    }

graph.edgeRDD=graph.edgeRDD.map(
    x=>
            //which node are node1,node2
        if(x._1==node1.getIdx||x._1==node2.getIdx)
            (newNode.getIdx,x._2,x._3,true)
        else if(x._2==node1.getIdx||x._2==node2.getIdx)
            (x._1,newNode.getIdx,x._3,true)
        else x
).map(
    x=>
        if(neighbourEdgeMap.contains((x._1,x._2)))
            (x._1,x._2,neighbourEdgeMap((x._1,x._2)),true)
        else x
).distinct()

\end{lstlisting}

另外，粗化过程的最大匹配每次都是组合两个点，需要记录这两个点的信息。
在\texttt{Node}类的属性上加入了\texttt{composition}和\texttt{compositionLevel}属性，分别表示这个节点由那两个节点组合而成，以及是在粗化的哪一个阶段组合而成。

\subsubsection{初步划分}

初步划分主要通过调用谱聚类完成。
前面提到Heavy Edge策略的时候，空间上容易形成聚集，而这些聚集的点与其他点的连接权重还会增加，所以这些点很容易聚成一类，解决方法是对拉普拉斯矩阵用点的weight进行归一化

谱聚类算法涉及到度矩阵的时候，使用度矩阵归一化，目标是求$D^{-1/2}LD^{-1/2}$的特征值和特征向量。
现在归一化节点权重也采用相似的思路，$L$矩阵每一个项变为：
$$L_{ij}/sqrt(D_i)sqrt(D_j)weight(i)weight(j)$$

实验显示均衡性的改善还不够，这里使用节点权重$\alpha$次方进行归一化：
$$L_{ij}/sqrt(D_i)sqrt(D_j)weight(i)^{\alpha} weight(j)^{\alpha}$$

对应的矩阵形式为:
$$W_{node}^{-\alpha}D^(-1/2)LD^(-1/2)W_{node}^{-\alpha}$$
实现过程中发现$\alpha=2$效果较好。

\subsubsection{细化过程}

细化过程本质上是递归的分解组合点，是粗化的逆过程，类似一个堆栈的出栈过程。
需要根据每个点记录的\texttt{compositionLevel}属性确定“出栈”的顺序，直到\texttt{compositionLevel}说明无需继续细化。
细化过程是针对\texttt{nodeRDD}展开的，用flatMap算子对当前细化层次的节点分解其\texttt{composition}，得到两个新的点。
对于不需要分解的点直接拷贝过来。
通过合并两部分\texttt{nodeRDD}得到细化图的\texttt{nodeRDD}，然后调用KL算法微调。
本项目实现的KL算法可以支持$K$路划分。
scala代码如下

\begin{lstlisting}[language=Scala]
var refinedGraph = graph
var refineLevel = level

//refine Level = 0 indicates that nodes all
while(refineLevel!=0){
    /** Step 1: Split coarsen node to refined nodes. */

    // 1.1 Find coarsen nodes and refine them (new node)
    var refinedNodeRDD = refinedGraph.nodeRDD.filter(x=>x.getComposLevel==refineLevel)

    refinedNodeRDD = refinedNodeRDD.map(_.setCompositionPartition())

    // 1.2 Save the nodes which don't need to refine
    val nodeRDD = refinedGraph.nodeRDD.filter(x=>x.getComposLevel!=refineLevel)
    refinedNodeRDD = refinedNodeRDD.flatMap(x=>x.getComposition)

    // 1.3 Union two parts of refined nodes.
    refinedGraph.nodeRDD = refinedNodeRDD.union(nodeRDD)


    /** Step 2: Partitioning */
    val assignment = refinedGraph.nodeRDD.map(x=>(x.getIdx,x.getPartition))

    refinedGraph = KernighanLin.partition(refinedGraph, assignment, needMaxGain = true)
    refinedGraph.nodeRDD = refinedGraph.nodeRDD.map(_.setChosen(false))

    refineLevel-=1 //refine Level decrease 1, up refine
}
\end{lstlisting}
